Assignment 03

Author
Affiliation

Sabrina Minaya Vasquez

Boston University

Published

September 24, 2025

Modified

September 21, 2025

1 Import Packages

import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "svg"

from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go

from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

# Set random seed
np.random.seed(42)

# Change Plotly renderer for notebooks
pio.renderers.default = "notebook"

2 Plotly Templete

pio.templates["nike"] = go.layout.Template(
    # LAYOUT
    layout = {
        # Fonts
        # Note - 'family' must be a single string, NOT a list or dict!
        'title':
            {'font': {'family': 'HelveticaNeue-CondensedBold, Helvetica, Sans-serif',
                      'size':30,
                      'color': '#333'}
            },
        'font': {'family': 'Helvetica Neue, Helvetica, Sans-serif',
                      'size':16,
                      'color': '#333'},
        # Colorways
        'colorway': ['#ec7424', '#a4abab'],
        # Keep adding others as needed below
        'hovermode': 'x unified'
    },
    # DATA
    data = {
        # Each graph object must be in a tuple or list for each trace
        'bar': [go.Bar(texttemplate = '%{value:$.2s}',
                       textposition='outside',
                       textfont={'family': 'Helvetica Neue, Helvetica, Sans-serif',
                                 'size': 20,
                                 'color': '#FFFFFF'
                                 })]
    }
)

3 Load Dataset

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-Sabrina1211/data/lightcast_job_postings.csv")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")

#df.printSchema() # comment this line when rendering the submission
#df.show(5)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/21 01:04:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 1) / 1]                                                                                

4 Data Preparation

# Step 1: Casting Salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
       .withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
       .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
       .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
       .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))


# Step 2: Computing medians for salary columns

def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

# Step 4: Imputing missing salaries, but no experience
df = df.fillna({
     "SALARY_FROM": median_from,
     "SALARY_TO": median_to,
   
})

# Step 5: Computing Average Salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) /2)

# Step 6: Selecting required columns
export_cols = [
    "EDUCATION_LEVELS_NAME",
    "REMOTE_TYPE_NAME",
    "MAX_YEARS_EXPERIENCE",
    "Average_Salary",
    "SALARY",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]

df_selected = df.select(*export_cols)

# Step 7: Saving to csv
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)

print("Data cleaning complete. Rows retained:", len(pdf))
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 5:>                                                          (0 + 1) / 1]                                                                                
Data cleaning complete. Rows retained: 72498

5 Salary Distribution Employment Type

# Filter out missing or zero salary values
pdf = (
    df.select("EMPLOYMENT_TYPE_NAME", col("SALARY").cast("double").alias("SALARY"))
      .filter( (col("SALARY").isNotNull()) & (col("SALARY") > 0) )
      .toPandas()
)

# Clean labels
pdf["EMPLOYMENT_TYPE_NAME"] = (
    pdf["EMPLOYMENT_TYPE_NAME"].astype("string").fillna("Unknown")
      .str.replace(r"[^\x00-\x7F]+", "", regex=True).str.strip()
)

# Sort by median salary
sorted_employment_types = (
    pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
       .sort_values(ascending=False).index
)
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(pdf["EMPLOYMENT_TYPE_NAME"],
                                             categories=sorted_employment_types,
                                             ordered=True)

# --- box plot: show outliers only ---
fig = px.box(
    pdf,
    x="EMPLOYMENT_TYPE_NAME",
    y="SALARY",
    title="Salary Distribution by Employment Type",
    color_discrete_sequence=["#EF553B"],
    points="outliers"  # ← key change
)

fig.update_layout(
    xaxis=dict(title="Employment Type", categoryorder="array",
               categoryarray=sorted_employment_types.tolist(),
               tickfont=dict(size=18)),
    yaxis=dict(title="Salary (K $)", range=[0, 500000],
               tickvals=[0, 50_000, 100_000, 150_000, 200_000, 250_000, 300_000, 350_000, 400_000, 450_000, 500_000],
               ticktext=["0","50K","100K","150K","200K","250K","300K","350K","400K","450K","500K"]),
    font=dict(family="Arial", size=16),
    plot_bgcolor="white",
    paper_bgcolor="white",
    showlegend=False,
    height=500, width=850
)

fig.show()
fig.write_html("output/DistributionEmploymentType.html")
fig.write_image("output/DistributionEmploymentType.svg", width=850, height=500, scale=1)
[Stage 6:>                                                          (0 + 1) / 1]                                                                                

6 Salary Distribution by Industry

pdf = df.select("NAICS2_NAME", "SALARY").toPandas()

fig = px.box(
    pdf,
    x="NAICS2_NAME",
    y="SALARY",
    title="Salary Distribution by Industry",
    color_discrete_sequence=["#EF553B"]
)

fig.update_layout(template="nike")  # change to "plotly_white" if this template isn't available

# rotate x-axis labels for readability
fig.update_xaxes(tickangle=45)

fig.show()

fig.write_html("output/DistributionIndustry.html")
fig.write_image("output/DistributionIndustry.svg", width=850, height=500, scale=1)